package com.easy.mq.client;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

import com.easy.mq.config.producer.RocketProducerConfig;
import com.easy.mq.result.RocketProducerMessage;

public final class RocketProducerPool {
	
	private static Map<String,DefaultMQProducer> poolMap = new ConcurrentHashMap<>();

	public static DefaultMQProducer getProducer(RocketProducerConfig mqProducerConfig,RocketProducerMessage message) throws MQClientException {		
	/*	String key = getKey(message);
		if(poolMap.containsKey(key)) {
			return poolMap.get(key);
		}*/
		DefaultMQProducer  defaultMQProducer = createProducerConfig(mqProducerConfig,message);
		//poolMap.put(key, defaultMQProducer);
		return defaultMQProducer;
		
	}

	private static DefaultMQProducer createProducerConfig(RocketProducerConfig mqProducerConfig,RocketProducerMessage message) {
		if(StringUtils.isEmpty(message.getGroup())) {
			mqProducerConfig.setGroup(UUID.randomUUID().toString());
		}
		DefaultMQProducer producer = new DefaultMQProducer(UUID.randomUUID().toString());
		producer.setNamesrvAddr(mqProducerConfig.getAddress());
		//producer.setInstanceName(UUID.randomUUID().toString());
		//producer.setInstanceName(mqProducerConfig.getInstanceName());
		// 生产者每隔30秒从nameserver获取所有topic的最新队列情况
		/*producer.setPollNameServerInteval(
				mqConfig.getPollNameServerInteval() <= 0 ? 30 : mqConfig.getPollNameServerInteval());*/
		// 默认情况下，生产者每隔30秒向所有broker发送心跳
		/*producer.setHeartbeatBrokerInterval(
				mqProducerConfig.getHeartbeatBrokerInterval() <= 0 ? 30 : mqProducerConfig.getHeartbeatBrokerInterval());*/
		try {
			producer.start();
		} catch (MQClientException e) {
			throw new RuntimeException(e);
		}
		return producer;
	}
	
	private  static String getKey(RocketProducerMessage message) {
		String key =  message.getGroup()+":"+message.getTopic();
		return key;
	}

	public static void shutdown(DefaultMQProducer producer,RocketProducerMessage message) {
		if (producer != null) {
			producer.shutdown();
			if(poolMap.containsKey(getKey(message))) {
				DefaultMQProducer defaultMQProducer = poolMap.get(getKey(message));
				if(defaultMQProducer != null) {
					defaultMQProducer.shutdown();
				}
				poolMap.remove(getKey(message));
			}
		}
	}
}
